package com.shujuwuliankeji.ems.cloud.module.bussiness.kafka;

/**
 * 
 * @author Fanzhongjie
 * @date 2023年1月29日 上午11:23:52
 * @tips TODO
 */
public class KafkaTest {

//	public static void main(String[] args) throws Exception {
//		Properties props = new Properties();
//		// 服务地址
//		props.setProperty("bootstrap.servers", "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092");
//		// 分组名称
//		props.setProperty("group.id", "bussiness");
//		// earliest:当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，从头开始消费
//
//		// latest:当各分区下有已提交的offset时，从提交的offset开始消费；无提交的offset时，消费新产生的该分区下的数据
//
//		// none:topic各分区都存在已提交的offset时，从offset后开始消费；只要有一个分区不存在已提交的offset，则抛出异常
//
//		props.setProperty("auto.offset.reset", "latest");
//
//		// key序列化方式
//		props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//		// value序列化方式
//		props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//		// 鉴权配置 username：用户名称，开通后平台分配，password：用户密码，开通后平台分配,实例配置信息页面查看
//		props.put("sasl.jaas.config",
//				"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"YuhRypcV\" password=\"WuqKnuDTRIybzUNM\";");
//		props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
//		props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
//		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//		consumer.subscribe(Arrays.asList("xyZt_test"));
//		while (true) {
//			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
//			for (ConsumerRecord<String, String> record : records) {
//
//				// 数据处理
//				System.out.printf("partition =%d offset = %d, key = %s, value = %s%n", record.partition(),
//						record.offset(), record.key(), record.value());
//			}
//		}
//	}

}
